Spring Cloud Gateway 源码分析及应用

您所在的位置:网站首页 spring cloud gateway Spring Cloud Gateway 源码分析及应用

Spring Cloud Gateway 源码分析及应用

#Spring Cloud Gateway 源码分析及应用| 来源: 网络整理| 查看: 265

一、简介

Spring Cloud Gateway 是Spring Cloud 生态全新项目,基于Spring 5、Spring Boot 2.X、Project Reactor实现的API网关,旨在为微服务提供简单高效的API路由管理方法。 Spring Cloud Gateway 作为Spring Cloud 生态中的网关,目标是代替Zuul 1.X。Spring Cloud 2.X版本目前仍未对Zuul 2.X高性能版本进行集成,仍使用的是非Reactor的老版本Zuul网关。

目前Spring Cloud dependencies 最新版本Hoxton.SR8 仍使用的是Zuul 1.3.1 Zuul 2.x 高性能Reactor版本本身与18年5月开源,目前最新版本2.1.9

为了提高网关性能,Spring Cloud Gateway基于WebFlux框架实现,而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。

1.1 术语 Route: Gateway的基本构建模块,由ID、目标URL、谓词集合和过滤器集合定义。 Predicate: Java8 Funciton Predicate,输入类型是 SpringFramework ServerWebExchange,可以匹配HTTP请求的所有内容,比如标头或参数。 Filter:使用特定工厂构造的Spring FrameworkGatewayFilter实例,可以在发送下游请求之前或之后修改请求或响应。 1.3 特性 动态路由:能够匹配任何请求属性; 可以对路由指定 Predicate(断言)和 Filter(过滤器); 集成Hystrix的断路器功能; 集成 Spring Cloud 服务发现功能; 易于编写的 Predicate(断言)和 Filter(过滤器); 请求限流功能; 支持路径重写 1.4 Spring Cloud Gateway与Spring Cloud Zuul Spring Cloud Zuul

Springcloud 2.x 版本到目前为止中所集成的Zuul版本(1.x),采用的是Tomcat容器,使用的是传统的Servlet IO处理模型。 servlet由servlet container进行生命周期管理。container启动时构造servlet对象并调用servlet init()进行初始化;container关闭时调用servlet destory()销毁servlet;container运行时接受请求,并为每个请求分配一个线程(一般从线程池中获取空闲线程)然后调用service()。 弊端:servlet是一个简单的网络IO模型,当请求进入servlet container时,servlet container就会为其绑定一个线程,在并发不高的场景下这种模型是适用的,但是一旦并发上升,线程数量就会上涨,而线程资源代价是昂贵的(上线文切换,内存消耗大)严重影响请求的处理时间。在一些简单的业务场景下,不希望为每个request分配一个线程,只需要1个或几个线程就能应对极大并发的请求,这种业务场景下servlet模型没有优势。

Zuul请求处理模型

所以Springcloud Zuul 是基于servlet之上的一个阻塞式处理模型,即spring实现了处理所有request请求的一个servlet(DispatcherServlet),并由该servlet阻塞式处理处理。所以Springcloud Zuul无法摆脱servlet模型的弊端。

Webflux模型

Webflux模式替换了旧的Servlet线程模型。用少量的线程处理request和response io操作,这些线程称为Loop线程,而业务交给响应式编程框架处理,响应式编程是非常灵活的,用户可以将业务中阻塞的操作提交到响应式框架的work线程中执行,而不阻塞的操作依然可以在Loop线程中进行处理,大大提高了Loop线程的利用率。官方结构图:

Webflux虽然可以兼容多个底层的通信框架,但是一般情况下,底层使用的还是Netty,毕竟,Netty是目前业界认可的最高性能的通信框架。而Webflux的Loop线程,正好就是著名的Reactor 模式IO处理模型的Reactor线程,如果使用的是高性能的通信框架Netty,这就是Netty的EventLoop线程。

1.5 如何集成Gateway

使用Gateway只需要简单引入依赖:

org.springframework.cloud spring-cloud-starter-gateway 二、工作原理

处理流程:

Gateway接受客户端请求; 网关处理程序映射确定请求与路由匹配,匹配成功则将其发送到网关Web处理程序; Web处理程序处理程序通过特定于请求的过滤器链运行请求: 请求经过 Filter 过滤器链,执行 pre 处理逻辑,如修改请求头信息等。 发出代理请求,请求被转发至下游服务并返回响应。 响应经过 Filter 过滤器链,执行 post 处理逻辑。 向客户端响应应答。

注意,在没有端口的路由中定义的URI,HTTP和HTTPS URI的默认端口值分别为80和443。

DispatcherHandler:所有请求的调度器,负载请求分发 public class DispatcherHandler implements WebHandler, ApplicationContextAware { @Nullable private List handlerMappings; @Nullable private List handlerAdapters; @Nullable private List resultHandlers; public DispatcherHandler() { } public DispatcherHandler(ApplicationContext applicationContext) { this.initStrategies(applicationContext); } @Nullable public final List getHandlerMappings() { return this.handlerMappings; } public void setApplicationContext(ApplicationContext applicationContext) { this.initStrategies(applicationContext); } # 初始、校验HandlerMapping并按order排序 protected void initStrategies(ApplicationContext context) { Map mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerMapping.class, true, false); ArrayList mappings = new ArrayList(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); this.handlerMappings = Collections.unmodifiableList(mappings); Map adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerAdapter.class, true, false); this.handlerAdapters = new ArrayList(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); Map beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerResultHandler.class, true, false); this.resultHandlers = new ArrayList(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); } //遍历handlerMappings ,根据exchange找到对应的handler // 对于Gateway 会找到对应的RoutePredicateHandlerMapping public Mono handle(ServerWebExchange exchange) { return this.handlerMappings == null ? this.createNotFoundError() : Flux.fromIterable(this.handlerMappings).concatMap((mapping) -> { return mapping.getHandler(exchange); }).next().switchIfEmpty(this.createNotFoundError())////如果遍历不到结果,则切换到错误处理 .flatMap((handler) -> { //通过HandlerAdapter调用handler, //gateway使用的 SimpleHandlerAdapter return this.invokeHandler(exchange, handler); }).flatMap((result) -> {//对响应进行处理 return this.handleResult(exchange, result); }); } private Mono createNotFoundError() { return Mono.defer(() -> { Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND, "No matching handler"); return Mono.error(ex); }); } private Mono invokeHandler(ServerWebExchange exchange, Object handler) { if (this.handlerAdapters != null) { Iterator var3 = this.handlerAdapters.iterator(); while(var3.hasNext()) { HandlerAdapter handlerAdapter = (HandlerAdapter)var3.next(); if (handlerAdapter.supports(handler)) { //调用handler的handle方法处理请求 return handlerAdapter.handle(exchange, handler); } } } return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler)); } //根据result获取对应的结果处理handler并处理结果 private Mono handleResult(ServerWebExchange exchange, HandlerResult result) { return this.getResultHandler(result).handleResult(exchange, result).checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]").onErrorResume((ex) -> { return result.applyExceptionHandler(ex).flatMap((exResult) -> { String text = "Exception handler " + exResult.getHandler() + ", error=\"" + ex.getMessage() + "\" [DispatcherHandler]"; return this.getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text); }); }); } private HandlerResultHandler getResultHandler(HandlerResult handlerResult) { if (this.resultHandlers != null) { Iterator var2 = this.resultHandlers.iterator(); while(var2.hasNext()) { HandlerResultHandler resultHandler = (HandlerResultHandler)var2.next(); if (resultHandler.supports(handlerResult)) { return resultHandler; } } } throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue()); } } RoutePredicateHandlerMapping:路由谓语匹配器,用于路由的查找,以及找到路由后返回对应的WebHandler,DispatcherHandler会依次遍历HandlerMapping集合进行处理 public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { private final FilteringWebHandler webHandler; private final RouteLocator routeLocator; private final Integer managementPort; private final RoutePredicateHandlerMapping.ManagementPortType managementPortType; public RoutePredicateHandlerMapping(FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { this.webHandler = webHandler; this.routeLocator = routeLocator; this.managementPort = getPortProperty(environment, "management.server."); this.managementPortType = this.getManagementPortType(environment); //设置排序字段1,此处的目的是Spring Cloud Gateway 的 GatewayWebfluxEndpoint 提供 HTTP API ,不需要经过网关 //它通过 RequestMappingHandlerMapping 进行请求匹配处理。RequestMappingHandlerMapping 的 order = 0 ,需要排在 RoutePredicateHandlerMapping 前面。所有,RoutePredicateHandlerMapping 设置 order = 1 。 this.setOrder(1); this.setCorsConfigurations(globalCorsProperties.getCorsConfigurations()); } private RoutePredicateHandlerMapping.ManagementPortType getManagementPortType(Environment environment) { Integer serverPort = getPortProperty(environment, "server."); if (this.managementPort != null && this.managementPort < 0) { return RoutePredicateHandlerMapping.ManagementPortType.DISABLED; } else { return this.managementPort != null && (serverPort != null || !this.managementPort.equals(8080)) && (this.managementPort == 0 || !this.managementPort.equals(serverPort)) ? RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT : RoutePredicateHandlerMapping.ManagementPortType.SAME; } } private static Integer getPortProperty(Environment environment, String prefix) { return (Integer)environment.getProperty(prefix + "port", Integer.class); } //设置mapping到上下文环境 protected Mono getHandlerInternal(ServerWebExchange exchange) { if (this.managementPortType == RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } else { exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_HANDLER_MAPPER_ATTR, this.getSimpleName()); // 查找路由 return this.lookupRoute(exchange).flatMap((r) -> { exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR); if (this.logger.isDebugEnabled()) { this.logger.debug("Mapping [" + this.getExchangeDesc(exchange) + "] to " + r); } //将查找到的路由设置到上下文环境 exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR, r); //返回mapping对应的WebHandler即FilteringWebHandler return Mono.just(this.webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { //当前未找到路由时返回空,并移除GATEWAY_PREDICATE_ROUTE_ATTR exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR); if (this.logger.isTraceEnabled()) { this.logger.trace("No RouteDefinition found for [" + this.getExchangeDesc(exchange) + "]"); } }))); } } protected CorsConfiguration getCorsConfiguration(Object handler, ServerWebExchange exchange) { return super.getCorsConfiguration(handler, exchange); } private String getExchangeDesc(ServerWebExchange exchange) { StringBuilder out = new StringBuilder(); out.append("Exchange: "); out.append(exchange.getRequest().getMethod()); out.append(" "); out.append(exchange.getRequest().getURI()); return out.toString(); } //通过路由定位器获取路由信息 protected Mono lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes().concatMap((route) -> { return Mono.just(route).filterWhen((r) -> { exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return (Publisher)r.getPredicate().apply(exchange);//通过谓词过滤路由 }).doOnError((e) -> { this.logger.error("Error applying predicate for route: " + route.getId(), e); }).onErrorResume((e) -> { return Mono.empty(); }); }).next().map((route) -> { if (this.logger.isDebugEnabled()) { this.logger.debug("Route matched: " + route.getId()); } this.validateRoute(route, exchange); return route; }); } protected void validateRoute(Route route, ServerWebExchange exchange) { } protected String getSimpleName() { return "RoutePredicateHandlerMapping"; } public static enum ManagementPortType { DISABLED, SAME, DIFFERENT; private ManagementPortType() { } } } FilteringWebHandler : 使用Filter链表处理请求的WebHandler,RoutePredicateHandlerMapping找到路由后返回对应的FilteringWebHandler对请求进行处理,FilteringWebHandler负责组装Filter链表并调用链表处理请求。 # 通过过滤器处理web请求的处理器 public class FilteringWebHandler implements WebHandler { protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class); # 全局过滤器 private final List globalFilters; public FilteringWebHandler(List globalFilters) { this.globalFilters = loadFilters(globalFilters); } private static List loadFilters(List filters) { return (List)filters.stream().map((filter) -> { FilteringWebHandler.GatewayFilterAdapter gatewayFilter = new FilteringWebHandler.GatewayFilterAdapter(filter); if (filter instanceof Ordered) { int order = ((Ordered)filter).getOrder(); return new OrderedGatewayFilter(gatewayFilter, order); } else { return gatewayFilter; } }).collect(Collectors.toList()); } public Mono handle(ServerWebExchange exchange) { #获取请求上下文设置的路由实例 Route route = (Route)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); # 获取网关路由定义下的网关过滤器集合 List gatewayFilters = route.getFilters(); # 组合全局的过滤器与路由配置的过滤器,并将路由器定义的过滤器添加集合尾部 List combined = new ArrayList(this.globalFilters); combined.addAll(gatewayFilters); AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } # 创建过滤器链表对其进行链式调用 return (new FilteringWebHandler.DefaultGatewayFilterChain(combined)).filter(exchange); } private static class GatewayFilterAdapter implements GatewayFilter { private final GlobalFilter delegate; GatewayFilterAdapter(GlobalFilter delegate) { this.delegate = delegate; } public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { return this.delegate.filter(exchange, chain); } public String toString() { StringBuilder sb = new StringBuilder("GatewayFilterAdapter{"); sb.append("delegate=").append(this.delegate); sb.append('}'); return sb.toString(); } } private static class DefaultGatewayFilterChain implements GatewayFilterChain { private final int index; private final List filters; DefaultGatewayFilterChain(List filters) { this.filters = filters; this.index = 0; } private DefaultGatewayFilterChain(FilteringWebHandler.DefaultGatewayFilterChain parent, int index) { this.filters = parent.getFilters(); this.index = index; } public List getFilters() { return this.filters; } public Mono filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < this.filters.size()) { GatewayFilter filter = (GatewayFilter)this.filters.get(this.index); FilteringWebHandler.DefaultGatewayFilterChain chain = new FilteringWebHandler.DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); } }); } } } 2.2 Gateway类图

根据DispatcherHandler入口整理的Gateway类图

Spring Cloud Gateway的配置由一系列RouteDefinitionLocator实例驱动。以下清单显示了RouteDefinitionLocator接口的定义:

RouteDefinitionLocator.java public interface RouteDefinitionLocator { Flux getRouteDefinitions(); }

默认情况下,PropertiesRouteDefinitionLocator使用Spring Boot的@ConfigurationProperties机制来加载属性。

三、配置路由谓词工厂和网关过滤工厂 3.1 两种不同的配置路由方式

Gateway 提供了两种不同的方式用于配置路由,一种是通过yml文件来配置,另一种是通过Java Bean来配置。

通过yml文件来配置 service-url: user-service: http://localhost:8201 spring: cloud: gateway: routes: - id: path_route #路由的ID uri: ${service-url.user-service}/user/{id} #匹配后路由地址 predicates: # 断言,路径相匹配的进行路由 - Path=/user/{id} 通过Java Bean来配置 @Configuration public class GatewayConfig { @Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { return builder.routes() .route("path_route2", r -> r.path("/user/getByUsername") .uri("http://localhost:8201/user/getByUsername")) .build(); } } 3.2 Route Predicate 的使用

Spring Cloud Gateway将路由匹配作为Spring WebFluxHandlerMapping基础架构的一部分。Spring Cloud Gateway包括许多内置的路由谓词工厂。所有这些谓词都与HTTP请求的不同属性匹配。可以将多个路由谓词工厂与逻辑and语句结合使用。 Predicate 来源于 Java 8,是 Java 8 中引入的一个函数,Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非)。可以用于接口请求参数校验、判断新老数据是否有变化需要进行更新操作。 在 Spring Cloud Gateway 中 Spring 利用 Predicate 的特性实现了各种路由匹配规则,有通过 Header、请求参数等不同的条件来进行作为条件匹配到对应的路由。

下图为 Spring Cloud Gateway内置的几种常见谓词路由器:

3.2.1 根据datetime 匹配

After Route Predicate 在指定时间之后的请求会匹配该路由。

spring: cloud: gateway: routes: - id: after_route uri: ${service-url.user-service} predicates: - After=2019-09-24T16:30:00+08:00[Asia/Shanghai]

Before Route Predicate 在指定时间之前的请求会匹配该路由。

spring: cloud: gateway: routes: - id: before_route uri: ${service-url.user-service} predicates: - Before=2019-09-24T16:30:00+08:00[Asia/Shanghai]

Between Route Predicate 在指定时间区间内的请求会匹配该路由。

spring: cloud: gateway: routes: - id: before_route uri: ${service-url.user-service} predicates: - Between=2019-09-24T16:30:00+08:00[Asia/Shanghai], 2019-09-25T16:30:00+08:00[Asia/Shanghai] 3.2.2 根据Cookie匹配

带有指定Cookie的请求会匹配该路由。

spring: cloud: gateway: routes: - id: cookie_route uri: ${service-url.user-service} predicates: - Cookie=username,macro 3.2.3 Header Route Predicate

带有指定请求头的请求会匹配该路由。

spring: cloud: gateway: routes: - id: header_route uri: ${service-url.user-service} predicates: - Header=X-Request-Id, \d+ 3.2.4 Host Route Predicate

带有指定Host的请求会匹配该路由。

spring: cloud: gateway: routes: - id: host_route uri: https://example.org predicates: - Host=**.qt.com 3.2.5 Method Route Predicate

发送指定方法的请求会匹配该路由。

spring: cloud: gateway: routes: - id: method_route uri: ${service-url.user-service} predicates: - Method=GET 3.2.5 Path Route Predicate

发送指定路径的请求会匹配该路由。

spring: cloud: gateway: routes: - id: path_route uri: ${service-url.user-service}/user/{id} predicates: - Path=/user/{id} 3.2.6 Query Route Predicate

带指定查询参数的请求可以匹配该路由。

spring: cloud: gateway: routes: - id: query_route uri: ${service-url.user-service}/user/getByUsername predicates: - Query=username 3.2.7 RemoteAddr Route Predicate

从指定远程地址发起的请求可以匹配该路由。

spring: cloud: gateway: routes: - id: remoteaddr_route uri: ${service-url.user-service} predicates: - RemoteAddr=192.168.1.1/24 3.3 Route Filter 的使用

根据Gateway工作原理,我们知道Gateway实际是由路由匹配到的一系列Filter过滤链来处理请求的,Spring Cloud Gateway包括许多内置的GatewayFilter工厂。具体详情参考官网: https://docs.spring.io/spring-cloud-gateway/docs/2.2.5.RELEASE/reference/html/#gatewayfilter-factories

3.4 Global Filters 全局过滤器

当请求与路由匹配时,过滤Web处理程序会将的所有实例GlobalFilter和所有特定GatewayFilter于路由的实例添加到过滤器链中。该组合的过滤器链按org.springframework.core.Ordered接口排序,可以通过实现该getOrder()方法进行设置。 Spring Cloud Gateway区分了执行过滤器逻辑的“前”和“后”阶段,因此优先级最高的过滤器是“前”阶段的第一个,而“后”阶段的最后一个是优先级最低的一个。

例如,下面程序配置了一个过滤器链:

@Bean public GlobalFilter customFilter() { return new CustomGlobalFilter(); } public class CustomGlobalFilter implements GlobalFilter, Ordered { @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("custom global filter"); return chain.filter(exchange); } @Override public int getOrder() { return -1; } } 四、 结合注册中心和配置中心使用

Gateway会根据注册中心注册的服务列表,以服务名为路径创建动态路由。这里主要使用Nacos作为注册中心和配置中心

4.1 使用动态路由 4.1.1 基本配置

引入依赖

com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config org.springframework.cloud spring-cloud-starter-gateway

启用DiscoveryClient网关集成

# spring.cloud.gateway.discovery.locator.enabled=true spring: cloud: gateway: discovery: locator: enabled: true #开启从注册中心动态创建路由的功能 lower-case-service-id: true #使用小写服务名,默认是大写

使用网关访问服务:

C:\Users\liangbodlz\.ssh>curl 192.168.132.49:1500/nacos-provider/index Hello! 4.1.2 使用Route Predicate Factory过滤器实现通过指定path访问服务

在实际生产环境中,我们往往不会通过服务的application-name来访问服务,而是通过某个固定的url path来访问,比如xx.xxx/user/login,来访问用户服务的接口 通过Spring Cloud Gateway 内置 Path Route Predicate Factory 可以实现该目标:

spring: cloud: gateway: discovery: locator: enabled: true #开启从注册中心动态创建路由的功能 lower-case-service-id: true #使用小写服务名,默认是大写 routes: - id: nacos-provider uri: lb://nacos-provider predicates: - Path=/nprovider/** filters: - StripPrefix=1

使用指定path访问服务

C:\Users\liangbodlz\.ssh>curl 192.168.132.49:1500/nprovider/index Hello! C:\Users\liangbodlz\.ssh> 4.1.3 使用Nacos数据源动态加载和刷新路由配置

通常我们将微服务的Route Predicate Path和Gateway应用本身的配置放在一起,但是随着微服务的扩展,Route Predicate Path会逐渐增加导致Gateway 服务配置会变得臃肿,且Route Predicate Path配置会随着服务的增减进行变更,而更新的路由配置生效需要重启Gateway,这都是实际线上环境不可忍受的。因此独立管理Route Predicate Path配置且支持动态刷新配置变得必要起来。

基于上述需求,我们可以考虑将Gateway 路由配置存储到内存或者其他介质中。 从源码分析中可以知道Gateway路由配置信息由RouteDefinitionLocator 接口完成。

RouteDefinitionLocator实现关系 RouteDefinitionLocator 是Gateway路由配置读取的顶级接口,提供从缓存、配置文件、服务注册中心、组合等不同方式读取配置,以及提供RouteDefinitionRepository 接口方式对RouteDefinition进行增、删、查操作。要自定义路由配置实现可以考虑从上述接口着手实现。 这里主要基于Nacos配置中心+RouteDefinitionRepository 自定义路由配置加载,并参考,CachingRouteLocator实现路由配置的动态刷新 核心源码清单

//自定义路由配置加载核心接口 public interface RouteDefinitionRepository extends RouteDefinitionLocator, RouteDefinitionWriter { } //查询路由 public interface RouteDefinitionLocator { //返回自定义路由配置加载 Flux getRouteDefinitions(); } //路由增、删 public interface RouteDefinitionWriter { Mono save(Mono route); Mono delete(Mono routeId); } //动态路由刷新实现 public class CachingRouteLocator implements Ordered, RouteLocator, ApplicationListener, ApplicationEventPublisherAware { ....//省略 private ApplicationEventPublisher applicationEventPublisher; .....//省略 public void onApplicationEvent(RefreshRoutesEvent event) { try { this.fetch().collect(Collectors.toList()).subscribe((list) -> { Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe((signals) -> { this.applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this)); this.cache.put("routes", signals); }, (throwable) -> { this.handleRefreshError(throwable); }); }); } catch (Throwable var3) { this.handleRefreshError(var3); } } private void handleRefreshError(Throwable throwable) { if (log.isErrorEnabled()) { log.error("Refresh routes error !!!", throwable); } this.applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this, throwable)); }

代码实现

//实现RouteDefinitionRepository接口 package com.easy.mall.route; import com.alibaba.fastjson.JSONObject; import com.alibaba.nacos.api.exception.NacosException; import com.easy.mall.config.GatewayConfig; import com.easy.mall.operation.NacosConfigOperation; import com.easy.mall.operation.NacosSubscribeCallback; import com.google.common.collect.Lists; import java.util.List; import java.util.Optional; import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.gateway.event.RefreshRoutesEvent; import org.springframework.cloud.gateway.route.RouteDefinition; import org.springframework.cloud.gateway.route.RouteDefinitionRepository; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * @description: 基于Nacos配置中心实现Gateway 动态路由配置 * @author: liangbo * @create 2020-12-15 19:29 * @Version 1.0 **/ @Slf4j @DependsOn(value= {"gatewayConfig","nacosAutoConfiguration"}) @Configuration @ConditionalOnProperty(prefix = "global.gateway.dynamicRoute", name = "enabled", havingValue = "true") public class NacosDynamicRouteDefinitionRepository implements RouteDefinitionRepository { @Autowired private NacosConfigOperation nacosConfigOperation; @Autowired private ApplicationEventPublisher publisher; @Override public Flux getRouteDefinitions() { //从Nacos配置中心读取路由配置 try { String dynamicRouteStr = nacosConfigOperation.getConfig(GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.NACOS_ROUTE_DATA_ID); log.info("init dynamicRoute success.:{}", dynamicRouteStr); List routeDefinitions = Optional.ofNullable(dynamicRouteStr) .map(str -> JSONObject.parseArray(str, RouteDefinition.class)) .orElse(Lists.newArrayList()); return Flux.fromIterable(routeDefinitions); } catch (NacosException e) { log.error("load gateway dynamicRoute config error:{}", e); } return Flux.fromIterable(Lists.newArrayList()); } @Override public Mono save(Mono route) { return null; } @Override public Mono delete(Mono routeId) { return null; } /** * 侦听nacos config 实时刷新路由配置 */ @PostConstruct public void subscribeConfigRefresh() { try { nacosConfigOperation.subscribeConfig(GatewayConfig.NACOS_ROUTE_GROUP, GatewayConfig.NACOS_ROUTE_DATA_ID, null, new NacosSubscribeCallback () { @Override public void callback(String config) { publisher.publishEvent(new RefreshRoutesEvent(this)); } }); } catch (NacosException e) { log.error("nacos-addListener-error", e); } } }

动态路由配置清单

[ { "id": "easy-mall-auth", "predicates": [{ "name": "Path", "args": { "pattern": "/emallauth/**" } }], "uri": "lb://easy-mall-auth", "filters": [{ "name": "StripPrefix", "args": { "parts": "1" } }] } ] 4.2 基于网关+nacos配置中心实现灰度路由

实现思路见Nacos安装及Spring Cloud 集成 3.4

4.2.1 定义GatewayStrategyAutoConfiguration 网关路由自定义配置入口类 @Configuration @AutoConfigureBefore(RibbonClientConfiguration.class) //通过注解@RibbonClient声明附加配置,此处声明的配置会覆盖配置文件中的配置 @RibbonClients(defaultConfiguration = { GatewayStrategyLoadBalanceConfiguration.class }) @ConditionalOnProperty(value = StrategyConstant.SPRING_APPLICATION_STRATEGY_CONTROL_ENABLED, matchIfMissing = true) public class GatewayStrategyAutoConfiguration { //省略......

通过入口类,加载自定义全局过滤器、Ribbon自定义负载均衡配置、元数据处理适配器等。

自定义Ribbon 负载均衡实现 自定义Ribbon 负载均衡实现分别对PredicateBasedRule和ZoneAvoidanceRule进行了扩展

//通过注解@RibbonClient声明附加配置,此处声明的配置会覆盖配置文件中的配置 @RibbonClients(defaultConfiguration = { GatewayStrategyLoadBalanceConfiguration.class }) @Bean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, serviceId)) { return this.propertiesFactory.get(IRule.class, config, serviceId); } //开启和关闭Ribbon默认的ZoneAvoidanceRule负载均衡策略。一旦关闭,则使用RoundRobin简单轮询负载均衡策略。缺失则默认为true boolean zoneAvoidanceRuleEnabled = environment.getProperty(StrategyConstant.SPRING_APPLICATION_STRATEGY_ZONE_AVOIDANCE_RULE_ENABLED, Boolean.class, Boolean.TRUE); if (zoneAvoidanceRuleEnabled) { DiscoveryEnabledZoneAvoidanceRule discoveryEnabledRule = new DiscoveryEnabledZoneAvoidanceRule(); discoveryEnabledRule.initWithNiwsConfig(config); DiscoveryEnabledZoneAvoidancePredicate discoveryEnabledPredicate = discoveryEnabledRule.getDiscoveryEnabledPredicate(); discoveryEnabledPredicate.setPluginAdapter(pluginAdapter); discoveryEnabledPredicate.setDiscoveryEnabledAdapter(discoveryEnabledAdapter); return discoveryEnabledRule; } else { DiscoveryEnabledBaseRule discoveryEnabledRule = new DiscoveryEnabledBaseRule(); DiscoveryEnabledBasePredicate discoveryEnabledPredicate = discoveryEnabledRule.getDiscoveryEnabledPredicate(); discoveryEnabledPredicate.setPluginAdapter(pluginAdapter); discoveryEnabledPredicate.setDiscoveryEnabledAdapter(discoveryEnabledAdapter); return discoveryEnabledRule; } }

DiscoveryEnabledZoneAvoidanceRule:

ZoneAvoidanceRule扩展 DiscoveryEnabledBaseRule PredicateBasedRule

自定义全局过滤器 实现将网关路由配置以及Http Header加载到请求ServerWebExchange中

@Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 把ServerWebExchange放入ThreadLocal中 GatewayStrategyContext.getCurrentContext().setExchange(exchange); // 通过过滤器设置路由Header头部信息,并全链路传递到服务端 ServerHttpRequest.Builder requestBuilder = exchange.getRequest().mutate(); if (gatewayCoreHeaderTransmissionEnabled) { // 内置Header预先塞入 Map headerMap = strategyWrapper.getHeaderMap(); if (MapUtils.isNotEmpty(headerMap)) { for (Map.Entry entry : headerMap.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); GatewayStrategyFilterResolver.setHeader(requestBuilder, key, value, gatewayHeaderPriority); } } //获取网关配置的路由规则 String routeVersion = getRouteVersion(); String routeVersionWeight = getRouteVersionWeight(); String routeIdBlacklist = getRouteIdBlacklist(); String routeAddressBlacklist = getRouteAddressBlacklist(); if (StringUtils.isNotEmpty(routeVersion)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_VERSION, routeVersion, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } if (StringUtils.isNotEmpty(routeVersionWeight)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_VERSION_WEIGHT, routeVersionWeight, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION_WEIGHT, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } if (StringUtils.isNotEmpty(routeIdBlacklist)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_ID_BLACKLIST, routeIdBlacklist, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ID_BLACKLIST, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } if (StringUtils.isNotEmpty(routeAddressBlacklist)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_ADDRESS_BLACKLIST, routeAddressBlacklist, gatewayHeaderPriority); } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ADDRESS_BLACKLIST, gatewayHeaderPriority, gatewayOriginalHeaderIgnored); } } else { GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION); GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_VERSION_WEIGHT); GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ID_BLACKLIST); GatewayStrategyFilterResolver.ignoreHeader(requestBuilder, DiscoveryConstant.N_D_ADDRESS_BLACKLIST); } // 对于服务A -> 网关 -> 服务B调用链 // 域网关下(zuulHeaderPriority=true),只传递网关自身的group,不传递服务A的group,起到基于组的网关端服务调用隔离 // 非域网关下(zuulHeaderPriority=false),优先传递服务A的group,基于组的网关端服务调用隔离不生效,但可以实现基于相关参数的熔断限流等功能 GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_GROUP, pluginAdapter.getGroup(), gatewayHeaderPriority); // 网关只负责传递服务A的相关参数(例如:serviceId),不传递自身的参数,实现基于相关参数的熔断限流等功能 GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_TYPE, pluginAdapter.getServiceType(), false); String serviceAppId = pluginAdapter.getServiceAppId(); if (StringUtils.isNotEmpty(serviceAppId)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_APP_ID, serviceAppId, false); } GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_ID, pluginAdapter.getServiceId(), false); GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_VERSION, pluginAdapter.getVersion(), false); GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.N_D_SERVICE_ENVIRONMENT, pluginAdapter.getEnvironment(), false); ServerHttpRequest newRequest = requestBuilder.build(); ServerWebExchange newExchange = exchange.mutate().request(newRequest).build(); ServerWebExchange extensionExchange = extendFilter(newExchange, chain); ServerWebExchange finalExchange = extensionExchange != null ? extensionExchange : newExchange; // 把新的ServerWebExchange放入ThreadLocal中 GatewayStrategyContext.getCurrentContext().setExchange(newExchange); String path = finalExchange.getRequest().getPath().toString(); if (path.contains(DiscoveryConstant.INSPECTOR_ENDPOINT_URL)) { GatewayStrategyFilterResolver.setHeader(requestBuilder, DiscoveryConstant.INSPECTOR_ENDPOINT_HEADER, pluginAdapter.getPluginInfo(null), true); } return chain.filter(finalExchange); }

自定义DefaultDiscoveryEnabledAdapter封装实例过滤规则 Ribbon负载均衡器执行默认过滤后会执行该规则

protected boolean apply(Server server) { if (discoveryEnabledAdapter == null) { return true; } return discoveryEnabledAdapter.apply(server); } //自定义过滤规则 @Override public boolean apply(Server server) { boolean enabled = applyEnvironment(server); if (!enabled) { return false; } enabled = applyVersion(server); if (!enabled) { return false; } enabled = applyIdBlacklist(server); if (!enabled) { return false; } enabled = applyAddressBlacklist(server); if (!enabled) { return false; } return applyStrategy(server); } 4.2.2 网关路由策略发布

基于nacos配置实现网关策略动态发布,根据网关元数据组以及serviceId创建路由策略配置:

网关路由策略配置

配置通过网关的请求都走版本xx

1.0

step1 启动网关以及2个服务实例

mvn spring-boot:run -Dspring-boot.run.arguments="--server.port=1100 --spring.cloud.nacos.discovery.metadata.version=1.0" mvn spring-boot:run -Dspring-boot.run.arguments="--server.port=1101 --spring.cloud.nacos.discovery.metadata.version=1.1" nacos实例 nacos服务实例

step2 通过网关调用服务,可以验证到请求始终访问到version为1.0 的服务实例

192.168.132.49:1500/nacos-provider/index

配置网关路由权重

1.0;1.1 1.0=90;1.1=10

灰度策略信息基于Nacos Client以及异步事件处理,动态更新,无需重启网关。 通过网关访问多次服务,请求基本按照9:1的比例命中服务。

配置IP地址和端口屏蔽策略,实现服务流量无损策略下线 服务下线场景中,由于Ribbon负载均衡组件存在着缓存机制,当被调用的服务实例已经下线,而调用的服务实例还暂时缓存着它,直到下个心跳周期才会把已下线的服务实例剔除,在此期间,会造成流量有损 框架提供流量的实时性的绝对无损。采用下线之前,把服务实例添加到屏蔽名单中,负载均衡不会去寻址该服务实例。 代码清单:

//省略 enabled = applyIdBlacklist(server); if (!enabled) { return false; } //省略 //过滤黑名单IP,框架会将黑名单中IP从Ribbon负载实例中移除 public boolean applyIdBlacklist(Server server) { String ids = pluginContextHolder.getContextRouteIdBlacklist(); if (StringUtils.isEmpty(ids)) { return true; } String serviceUUId = pluginAdapter.getServerServiceUUId(server); List idList = StringUtil.splitToList(ids, DiscoveryConstant.SEPARATE); if (idList.contains(serviceUUId)) { return false; } return true; }

配置Ip黑名单

//此处省略

发布配置后,访问服务可以发现请求屏蔽了端口为1100的服务,确保1100服务下线,请求不会命中到1100服务。 源码地址



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3